package endpoint

import (
	"gf-mysql-transfer/internal/global"
	"gf-mysql-transfer/internal/metrics"
	"gf-mysql-transfer/internal/model"
	"gf-mysql-transfer/internal/service/luaengine"
	"gf-mysql-transfer/internal/utility/mlog"
	"github.com/go-mysql-org/go-mysql/canal"
	"github.com/go-mysql-org/go-mysql/mysql"
	"github.com/gogf/gf/v2/database/gdb"
	"github.com/gogf/gf/v2/errors/gerror"
	"github.com/gogf/gf/v2/frame/g"
	"github.com/gogf/gf/v2/os/gtime"
	"log"
	"strings"
	"sync"
)

type DbEndpoint struct {
	lock      sync.Mutex
	collLock  sync.RWMutex
	retryLock sync.Mutex
}

func newDbEndpoint() *DbEndpoint {
	r := &DbEndpoint{}
	r.ConnDB()
	return r
}

func (s *DbEndpoint) ConnDB() {
	cfg := global.Cfg()
	//dbConfig :=
	for group, configNode := range cfg.Db {
		gdb.SetConfigGroup(group, gdb.ConfigGroup{
			configNode,
		})
	}

	err := s.Ping()
	if err != nil {
		panic("db连接失败")

	}
}

func (s *DbEndpoint) Connect() error {
	return s.Ping()

}

func (s *DbEndpoint) Ping() (err error) {
	return g.DB().PingMaster()
}

func (s *DbEndpoint) Consume(from mysql.Position, rows []*model.RowRequest) (err error) {
	start := gtime.TimestampMilli()
	err = s.handlerData(rows)
	if err != nil {
		return err
	}
	mlog.InfoF("处理完成 %d 条数据，在 %d 毫秒  内完成", len(rows), gtime.TimestampMilli()-start)
	return nil
}

func (s *DbEndpoint) handlerData(rows []*model.RowRequest) (err error) {
	for _, row := range rows {
		rule, _ := global.RuleIns(row.RuleKey)
		if rule.TableColumnSize != len(row.Row) {
			mlog.WarningF("%s schema mismatching", row.RuleKey)
			continue
		}

		//更新监控操作数量
		metrics.UpdateActionNum(row.Action, row.RuleKey)

		//入库
		if rule.LuaEnable() {
			kvm := rowMap(row, rule, true)
			oldMap := oldRowMap(row, rule, true)
			ls, err := luaengine.DoDbOps(kvm, oldMap, row.Action, rule)
			if err != nil {
				mlog.Error("lua 脚本执行失败 : %s ", gerror.Stack(err))
				return err
			}
			for _, resp := range ls {
				switch resp.Action {
				case canal.InsertAction:
					err = s.insert(resp.Group, resp.TableName, resp.Table)
				case canal.UpdateAction:
					err = s.update(resp.Group, resp.TableName, resp.Table, resp.Where)
				case canal.DeleteAction:
					err = s.delete(resp.Group, resp.TableName, resp.Table, resp.Where)
				}
				if err != nil {
					mlog.ErrorF("执行%s操作失败 : %s ", resp.Action, gerror.Stack(err))
					return err
				}
			}

		} else {
			kvm := rowMap(row, rule, true)
			switch row.Action {
			case canal.InsertAction:
				err = s.insert(rule.TargetGroup, rule.TargetTable, kvm)
			case canal.UpdateAction:
				oldMap := oldRowMap(row, rule, true)
				updateMap := make(map[string]interface{}, 0)
				for k, v := range kvm {
					if kvm[k] != oldMap[k] {
						updateMap[k] = v
					}
				}
				err = s.update(rule.TargetGroup, rule.TargetTable, updateMap, primaryKeyMap(row, rule))
			case canal.DeleteAction:
				err = s.delete(rule.TargetGroup, rule.TargetTable, kvm, primaryKeyMap(row, rule))
			}
			if err != nil {
				mlog.ErrorF("执行%s操作失败 : %s ", row.Action, gerror.Stack(err))
				return
			}
		}
	}
	return
}

func (s *DbEndpoint) Stock(rows []*model.RowRequest) int64 {
	expect := true
	for _, row := range rows {
		rule, _ := global.RuleIns(row.RuleKey)
		if rule.TableColumnSize != len(row.Row) {
			mlog.WarningF("%s schema mismatching", row.RuleKey)
			continue
		}
		kvm := rowMap(row, rule, true)
		if rule.LuaEnable() {
			oldMap := oldRowMap(row, rule, true)
			ls, err := luaengine.DoDbOps(kvm, oldMap, row.Action, rule)
			if err != nil {
				log.Println("Lua 脚本执行失败!!! ,详情请参见日志")
				mlog.ErrorF("lua 脚本执行失败 : %s ", gerror.Stack(err))
				expect = false
				break
			}

			for _, resp := range ls {
				switch resp.Action {
				case canal.InsertAction:
					err = s.insert(resp.Group, resp.TableName, resp.Table)
				case canal.UpdateAction:
					err = s.update(resp.Group, resp.TableName, resp.Table, resp.Where)
				case canal.DeleteAction:
					err = s.delete(resp.Group, resp.TableName, resp.Table, resp.Where)
				}
				if err != nil {
					panic(err)
				}
			}
		} else {
			var err error
			switch row.Action {
			case canal.InsertAction:
				err = s.insert(rule.TargetGroup, rule.TargetTable, kvm)
			case canal.UpdateAction:
				oldMap := oldRowMap(row, rule, true)
				updateMap := make(map[string]interface{}, 0)
				for k, v := range kvm {
					if kvm[k] != oldMap[k] {
						updateMap[k] = v
					}
				}
				err = s.update(rule.TargetGroup, rule.TargetTable, updateMap, primaryKeyMap(row, rule))
			case canal.DeleteAction:
				err = s.delete(rule.TargetGroup, rule.TargetTable, kvm, primaryKeyMap(row, rule))
			}
			if err != nil {
				panic(err)
			}
		}
	}

	if !expect {
		return 0
	}

	return int64(len(rows))
}

func (s *DbEndpoint) insert(group string, table string, kvm map[string]interface{}) (err error) {
	if strings.ToLower(g.DB(group).GetConfig().Type) == "mysql" {
		_, err = g.DB(group).Model(table).Data(kvm).Save()
	} else {
		_, err = g.DB(group).Model(table).Data(kvm).Insert()
	}
	return err
}

func (s *DbEndpoint) update(group string, table string, updateMap map[string]interface{}, where map[string]interface{}) error {
	_, err := g.DB(group).Model(table).Where(where).Update(updateMap)
	return err
}

func (s *DbEndpoint) delete(group string, table string, data map[string]interface{}, where map[string]interface{}) error {
	_, err := g.DB(group).Model(table).Where(where).Delete(data)
	return err
}

func (s *DbEndpoint) Close() {
}
